Skip to content

Support VALKEYMODULE_REPLY_AGAIN in VM_UnblockClient Reply cb#3707

Draft
KarthikSubbarao wants to merge 3 commits into
valkey-io:unstablefrom
KarthikSubbarao:reblock
Draft

Support VALKEYMODULE_REPLY_AGAIN in VM_UnblockClient Reply cb#3707
KarthikSubbarao wants to merge 3 commits into
valkey-io:unstablefrom
KarthikSubbarao:reblock

Conversation

@KarthikSubbarao
Copy link
Copy Markdown
Member

Please note this is a draft PR as I am working on perf benchmarking of Valkey Core + Valkey Search. Do not review yet. I will update the status of the PR once ready.

Today, a module that blocks a client cannot keep it blocked after the reply callback is invoked. The core always unblocks the client and frees the blocked client handle after the reply callback returns. This forces modules that need to do main-thread work after unblock (like valkey-search's content resolution) to use EventLoopAddOneShot to schedule that work, adding an extra event loop iteration of latency per operation.

The fix adds a new return value VALKEYMODULE_REPLY_AGAIN (value 2). When the reply callback returns this, the core keeps the client blocked with its privdata intact. The module calls UnblockClient again later to re-trigger the reply callback. This is not a breaking change — the reply callback already returns int and existing modules return 0 or 1.

I am leaning towards this over a separate VM_ReblockClient() API (or new blocking framework) because the blocked client already has all necessary state (privdata, callbacks, timeout). No new state machine or lifecycle management needed — just a signal that says "I'm not done yet."

Please note that I need to review the core module.c code and ensure the core change correctly works with the rest of the module blocking framework. here is what I did so far and tested:

  • Temp clients (reply_client, thread_safe_ctx_client) are released and re-allocated to prevent leaks
  • bc->unblocked is reset to 0 so the timeout mechanism continues to work
  • Disconnect callback remains active and fires correctly
  • Contract: module must not write any reply before returning REPLY_AGAIN

Tested with valkey-search (10k docs, FT.SEARCH "common" LIMIT 0 10). Both before and after include the valkeysearch PausePoint + DecrementRefCount fixes from PR 1037.

Scenario: 1 TEXT field × 25 bytes (minimal content)
Clients | Before (req/s) | After (req/s) | Improvement
1       | 1,372          | 1,385         | +1%
10      | 9,174          | 9,615         | +5%
50      | 24,390         | 25,641        | +5%

Scenario: 5 TEXT fields × 1KB each (typical content workload)
Clients | Before (req/s) | After (req/s) | Improvement
1       | 588            | 1,036         | +76%
50      | 12,048         | 17,422        | +45%
200     | 11,429         | 12,136        | +6%
500     | 11,062         | 13,193        | +19%

The fix helps most when content resolution involves real work (multiple fields, larger data). With trivial content (1 small field), the event loop hop is negligible. With 5 fields × 1KB, the scheduling overhead becomes a significant fraction of per-query latency — especially at low concurrency where each query pays the full round-trip cost.

Prior to the fix, main thread is mostly idle between EventLoopAddOneShot callbacks — the scheduling hop is the bottleneck at low concurrency.

After the fix, Main thread does content fetch + reply inline in the reply callback. No scheduling hop.

Tests added to tests/modules/blockedclient.c:

  • reblock.test — re-blocks twice, replies on third unblock
  • reblock.timeout — re-blocks forever, verifies timeout callback fires
  • reblock.disconnect — re-blocks, client killed, verifies disconnect callback fires
  • CLIENT UNBLOCK on re-blocked client — verifies abort triggers timeout callback

Signed-off-by: Karthik Subbarao <karthikrs2021@gmail.com>
@KarthikSubbarao KarthikSubbarao marked this pull request as draft May 14, 2026 08:23
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 14, 2026

Review Change Stack

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 633ef42d-a6cd-4bd5-b2ff-3465751dff87

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces VALKEYMODULE_REPLY_AGAIN, enabling modules to re-block clients by returning this status from reply callbacks. The core implementation modifies moduleHandleBlockedClients() to detect this return value and restart the client's blocked state instead of proceeding with normal unblock/cleanup. Comprehensive test coverage validates re-blocking behavior, timeout handling, disconnect callbacks, and interaction with CLIENT UNBLOCK commands.

Changes

Blocked Client Re-Blocking Feature

Layer / File(s) Summary
Define VALKEYMODULE_REPLY_AGAIN constant
src/valkeymodule.h
Adds the new VALKEYMODULE_REPLY_AGAIN macro (value 2) describing the re-block return code for reply callbacks.
Implement re-blocking logic in moduleHandleBlockedClients
src/module.c
Captures the reply callback return value; when it equals VALKEYMODULE_REPLY_AGAIN, reinitializes temporary reply clients, clears the unblocked marker, and continues the blocked-clients loop without invoking the normal unblock/free path.
Add test module commands for re-blocking behavior
tests/modules/blockedclient.c
Implements five new reblock.* command handlers with global callback counters, timer-driven unblock sequences, and disconnect callbacks demonstrating REPLY_AGAIN re-blocking across scenarios: basic re-blocking with eventual reply, timeout while re-blocked, and disconnect callback observation.
Add unit test coverage for re-blocking
tests/unit/moduleapi/blockedclient.tcl
Adds four TCL unit tests validating successive REPLY_AGAIN returns followed by final reply, timeout callbacks when REPLY_AGAIN persists beyond timeout, disconnect callback invocation, and CLIENT UNBLOCK timeout abort during re-blocking.

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 62.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: adding support for VALKEYMODULE_REPLY_AGAIN in the blocked client reply callback handling.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, detailing the rationale, implementation details, performance benchmarks, and test coverage for the REPLY_AGAIN feature.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/module.c`:
- Around line 9002-9005: The code releases and re-allocates
bc->thread_safe_ctx_client across VALKEYMODULE_REPLY_AGAIN which invalidates
ctx->client returned by VM_GetThreadSafeContext and can cause use-after-free;
fix by not recycling or replacing bc->thread_safe_ctx_client in this path —
remove or skip the moduleReleaseTempClient/moduleAllocTempClient calls for
bc->thread_safe_ctx_client here and only touch bc->reply_client (or allocate a
new thread-safe client per VM_GetThreadSafeContext and ensure the context
owns/releases it) so outstanding thread-safe contexts keep a stable client
pointer.
- Around line 9002-9005: When replacing the temporary clients in the reblock
path, copy the original client's RESP mode into the newly allocated reply client
so buffered replies use the correct RESP version: after calling
moduleReleaseTempClient and moduleAllocTempClient for bc->reply_client (and
similarly for bc->thread_safe_ctx_client if needed), set bc->reply_client->resp
= bc->client->resp (or use the appropriate accessor on the new temp client) to
restore the RESP mode that was set earlier in the initial block path.

In `@tests/unit/moduleapi/blockedclient.tcl`:
- Around line 314-356: The tests rely on sleeps and racey behavior; replace
time-based waits with explicit synchronization: after starting
valkey_deferring_client and issuing reblock.disconnect or reblock.timeout, poll
r client list (the existing foreach over split $clients) in a short loop with a
timeout to wait until a line matching "*cmd=reblock.disconnect*" or
"*cmd=reblock.timeout*" yields a non-empty target_id, then immediately perform r
client kill id $target_id or r client unblock $target_id timeout respectively;
after sending unblock/kill, wait for the specific deterministic signals using r
reblock.get_disconnect_called or the exact $rd read result (rather than fixed
after N ms), and only then close $rd—this enforces the intended unblock/kill
path and removes flaky sleeps.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 639266a1-164f-440e-a2ce-4dd997e3d65c

📥 Commits

Reviewing files that changed from the base of the PR and between fdf13ca and 1aee438.

📒 Files selected for processing (4)
  • src/module.c
  • src/valkeymodule.h
  • tests/modules/blockedclient.c
  • tests/unit/moduleapi/blockedclient.tcl

Comment thread src/module.c
Comment on lines +9002 to +9005
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);
bc->reply_client = moduleAllocTempClient();
bc->thread_safe_ctx_client = moduleAllocTempClient();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Avoid recycling thread_safe_ctx_client across VALKEYMODULE_REPLY_AGAIN.

VM_GetThreadSafeContext() stores bc->thread_safe_ctx_client into ctx->client. Replacing that client here means any outstanding thread-safe contexts created before the first unblock now hold a dangling client pointer. If a worker thread keeps using the same context after returning VALKEYMODULE_REPLY_AGAIN, this turns into a use-after-free / cross-request state corruption bug.

As per coding guidelines, "Verify thread safety in threaded I/O paths in C code".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/module.c` around lines 9002 - 9005, The code releases and re-allocates
bc->thread_safe_ctx_client across VALKEYMODULE_REPLY_AGAIN which invalidates
ctx->client returned by VM_GetThreadSafeContext and can cause use-after-free;
fix by not recycling or replacing bc->thread_safe_ctx_client in this path —
remove or skip the moduleReleaseTempClient/moduleAllocTempClient calls for
bc->thread_safe_ctx_client here and only touch bc->reply_client (or allocate a
new thread-safe client per VM_GetThreadSafeContext and ensure the context
owns/releases it) so outstanding thread-safe contexts keep a stable client
pointer.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Restore the reply client's RESP mode after reblocking.

The initial block path copies bc->client->resp into bc->reply_client, but this branch allocates a fresh pooled client without doing the same. Any buffered reply emitted before the next unblock can then be serialized in the wrong RESP version.

Proposed fix
         if (reply_ret == VALKEYMODULE_REPLY_AGAIN) {
             moduleReleaseTempClient(bc->reply_client);
             moduleReleaseTempClient(bc->thread_safe_ctx_client);
             bc->reply_client = moduleAllocTempClient();
             bc->thread_safe_ctx_client = moduleAllocTempClient();
+            if (bc->client) bc->reply_client->resp = bc->client->resp;
             bc->unblocked = 0;
             pthread_mutex_lock(&moduleUnblockedClientsMutex);
             continue;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);
bc->reply_client = moduleAllocTempClient();
bc->thread_safe_ctx_client = moduleAllocTempClient();
if (reply_ret == VALKEYMODULE_REPLY_AGAIN) {
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);
bc->reply_client = moduleAllocTempClient();
bc->thread_safe_ctx_client = moduleAllocTempClient();
if (bc->client) bc->reply_client->resp = bc->client->resp;
bc->unblocked = 0;
pthread_mutex_lock(&moduleUnblockedClientsMutex);
continue;
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/module.c` around lines 9002 - 9005, When replacing the temporary clients
in the reblock path, copy the original client's RESP mode into the newly
allocated reply client so buffered replies use the correct RESP version: after
calling moduleReleaseTempClient and moduleAllocTempClient for bc->reply_client
(and similarly for bc->thread_safe_ctx_client if needed), set
bc->reply_client->resp = bc->client->resp (or use the appropriate accessor on
the new temp client) to restore the RESP mode that was set earlier in the
initial block path.

Comment on lines +314 to +356
test {REPLY_AGAIN disconnect fires disconnect callback} {
# Start reblock.disconnect on a separate client, then kill that client.
set rd [valkey_deferring_client]
$rd reblock.disconnect
after 100
# Get the client id
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.disconnect*" $line]} {
regexp {id=(\d+)} $line -> target_id
}
}
# Kill the client
if {$target_id ne ""} {
r client kill id $target_id
}
after 200
# Verify disconnect callback was called
assert_equal 1 [r reblock.get_disconnect_called]
$rd close
}

test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
# Block a client with reblock that always returns REPLY_AGAIN
# then use CLIENT UNBLOCK to abort it — should trigger timeout cb
set rd [valkey_deferring_client]
$rd reblock.timeout
after 100
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.timeout*" $line]} {
regexp {id=(\d+)} $line -> target_id
}
}
if {$target_id ne ""} {
r client unblock $target_id timeout
}
set result [$rd read]
assert_match "*Timed out*" $result
$rd close
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make the re-block integration tests deterministic and enforce the intended path.

These tests currently rely on fixed sleeps and optional client kill/unblock, which can make them flaky and can let the CLIENT UNBLOCK case pass via natural timeout instead of the unblock path.

💡 Proposed fix
@@
     test {REPLY_AGAIN disconnect fires disconnect callback} {
         # Start reblock.disconnect on a separate client, then kill that client.
         set rd [valkey_deferring_client]
         $rd reblock.disconnect
-        after 100
-        # Get the client id
-        set clients [r client list]
         set target_id ""
-        foreach line [split $clients "\n"] {
-            if {[string match "*cmd=reblock.disconnect*" $line]} {
-                regexp {id=(\d+)} $line -> target_id
-            }
-        }
-        # Kill the client
-        if {$target_id ne ""} {
-            r client kill id $target_id
-        }
-        after 200
-        # Verify disconnect callback was called
-        assert_equal 1 [r reblock.get_disconnect_called]
+        wait_for_condition 50 20 {
+            set clients [r client list]
+            set target_id ""
+            foreach line [split $clients "\n"] {
+                if {[string match "*cmd=reblock.disconnect*" $line]} {
+                    regexp {id=(\d+)} $line -> target_id
+                    break
+                }
+            }
+            $target_id ne ""
+        } else {
+            fail "Failed to find client running reblock.disconnect"
+        }
+
+        assert_equal 1 [r client kill id $target_id]
+        wait_for_condition 50 20 {
+            [r reblock.get_disconnect_called] eq 1
+        } else {
+            fail "Failed waiting for disconnect callback"
+        }
         $rd close
     }
@@
     test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
         # Block a client with reblock that always returns REPLY_AGAIN
         # then use CLIENT UNBLOCK to abort it — should trigger timeout cb
         set rd [valkey_deferring_client]
         $rd reblock.timeout
-        after 100
-        set clients [r client list]
         set target_id ""
-        foreach line [split $clients "\n"] {
-            if {[string match "*cmd=reblock.timeout*" $line]} {
-                regexp {id=(\d+)} $line -> target_id
-            }
-        }
-        if {$target_id ne ""} {
-            r client unblock $target_id timeout
-        }
+        wait_for_condition 50 20 {
+            set clients [r client list]
+            set target_id ""
+            foreach line [split $clients "\n"] {
+                if {[string match "*cmd=reblock.timeout*" $line]} {
+                    regexp {id=(\d+)} $line -> target_id
+                    break
+                }
+            }
+            $target_id ne ""
+        } else {
+            fail "Failed to find client running reblock.timeout"
+        }
+
+        assert_equal 1 [r client unblock $target_id timeout]
         set result [$rd read]
         assert_match "*Timed out*" $result
         $rd close
     }

As per coding guidelines: "Avoid timing-dependent tests; use proper synchronization (test reliability)".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
test {REPLY_AGAIN disconnect fires disconnect callback} {
# Start reblock.disconnect on a separate client, then kill that client.
set rd [valkey_deferring_client]
$rd reblock.disconnect
after 100
# Get the client id
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.disconnect*" $line]} {
regexp {id=(\d+)} $line -> target_id
}
}
# Kill the client
if {$target_id ne ""} {
r client kill id $target_id
}
after 200
# Verify disconnect callback was called
assert_equal 1 [r reblock.get_disconnect_called]
$rd close
}
test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
# Block a client with reblock that always returns REPLY_AGAIN
# then use CLIENT UNBLOCK to abort it — should trigger timeout cb
set rd [valkey_deferring_client]
$rd reblock.timeout
after 100
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.timeout*" $line]} {
regexp {id=(\d+)} $line -> target_id
}
}
if {$target_id ne ""} {
r client unblock $target_id timeout
}
set result [$rd read]
assert_match "*Timed out*" $result
$rd close
}
test {REPLY_AGAIN disconnect fires disconnect callback} {
# Start reblock.disconnect on a separate client, then kill that client.
set rd [valkey_deferring_client]
$rd reblock.disconnect
set target_id ""
wait_for_condition 50 20 {
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.disconnect*" $line]} {
regexp {id=(\d+)} $line -> target_id
break
}
}
$target_id ne ""
} else {
fail "Failed to find client running reblock.disconnect"
}
assert_equal 1 [r client kill id $target_id]
wait_for_condition 50 20 {
[r reblock.get_disconnect_called] eq 1
} else {
fail "Failed waiting for disconnect callback"
}
$rd close
}
test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
# Block a client with reblock that always returns REPLY_AGAIN
# then use CLIENT UNBLOCK to abort it — should trigger timeout cb
set rd [valkey_deferring_client]
$rd reblock.timeout
set target_id ""
wait_for_condition 50 20 {
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.timeout*" $line]} {
regexp {id=(\d+)} $line -> target_id
break
}
}
$target_id ne ""
} else {
fail "Failed to find client running reblock.timeout"
}
assert_equal 1 [r client unblock $target_id timeout]
set result [$rd read]
assert_match "*Timed out*" $result
$rd close
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/moduleapi/blockedclient.tcl` around lines 314 - 356, The tests
rely on sleeps and racey behavior; replace time-based waits with explicit
synchronization: after starting valkey_deferring_client and issuing
reblock.disconnect or reblock.timeout, poll r client list (the existing foreach
over split $clients) in a short loop with a timeout to wait until a line
matching "*cmd=reblock.disconnect*" or "*cmd=reblock.timeout*" yields a
non-empty target_id, then immediately perform r client kill id $target_id or r
client unblock $target_id timeout respectively; after sending unblock/kill, wait
for the specific deterministic signals using r reblock.get_disconnect_called or
the exact $rd read result (rather than fixed after N ms), and only then close
$rd—this enforces the intended unblock/kill path and removes flaky sleeps.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 14, 2026

Codecov Report

❌ Patch coverage is 0% with 11 lines in your changes missing coverage. Please review.
✅ Project coverage is 76.51%. Comparing base (5b167f4) to head (206b17d).
⚠️ Report is 1 commits behind head on unstable.

Files with missing lines Patch % Lines
src/module.c 0.00% 11 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #3707      +/-   ##
============================================
- Coverage     76.71%   76.51%   -0.21%     
============================================
  Files           162      162              
  Lines         80662    80672      +10     
============================================
- Hits          61881    61725     -156     
- Misses        18781    18947     +166     
Files with missing lines Coverage Δ
src/module.c 25.26% <0.00%> (-0.05%) ⬇️

... and 22 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Signed-off-by: Karthik Subbarao <karthikrs2021@gmail.com>
Signed-off-by: Karthik Subbarao <karthikrs2021@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant